1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.util.concurrent.Futures.immediateFuture;
21 import static com.google.common.util.concurrent.JdkFutureAdapters.listenInPoolThread;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static java.util.concurrent.Executors.newCachedThreadPool;
24 import static java.util.concurrent.TimeUnit.SECONDS;
25
26 import com.google.common.testing.ClassSanityTester;
27 import com.google.common.util.concurrent.FuturesTest.ExecutorSpy;
28 import com.google.common.util.concurrent.FuturesTest.SingleCallListener;
29
30 import junit.framework.AssertionFailedError;
31 import junit.framework.TestCase;
32
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.SynchronousQueue;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39
40
41
42
43
44
45
46 public class JdkFutureAdaptersTest extends TestCase {
47 private static final String DATA1 = "data";
48
49 public void testListenInPoolThreadReturnsSameFuture() throws Exception {
50 ListenableFuture<String> listenableFuture = immediateFuture(DATA1);
51 assertSame(listenableFuture, listenInPoolThread(listenableFuture));
52 }
53
54 public void testListenInPoolThreadIgnoresExecutorWhenDelegateIsDone()
55 throws Exception {
56 NonListenableSettableFuture<String> abstractFuture =
57 NonListenableSettableFuture.create();
58 abstractFuture.set(DATA1);
59 ExecutorSpy spy = new ExecutorSpy(directExecutor());
60 ListenableFuture<String> listenableFuture =
61 listenInPoolThread(abstractFuture, spy);
62
63 SingleCallListener singleCallListener = new SingleCallListener();
64 singleCallListener.expectCall();
65
66 assertFalse(spy.wasExecuted);
67 assertFalse(singleCallListener.wasCalled());
68 assertTrue(listenableFuture.isDone());
69
70
71
72 listenableFuture.addListener(singleCallListener, directExecutor());
73 assertEquals(DATA1, listenableFuture.get());
74
75
76
77 assertFalse(spy.wasExecuted);
78 assertTrue(singleCallListener.wasCalled());
79 assertTrue(listenableFuture.isDone());
80 }
81
82 public void testListenInPoolThreadUsesGivenExecutor() throws Exception {
83 ExecutorService executorService = newCachedThreadPool(
84 new ThreadFactoryBuilder().setDaemon(true).build());
85 NonListenableSettableFuture<String> abstractFuture =
86 NonListenableSettableFuture.create();
87 ExecutorSpy spy = new ExecutorSpy(executorService);
88 ListenableFuture<String> listenableFuture =
89 listenInPoolThread(abstractFuture, spy);
90
91 SingleCallListener singleCallListener = new SingleCallListener();
92 singleCallListener.expectCall();
93
94 assertFalse(spy.wasExecuted);
95 assertFalse(singleCallListener.wasCalled());
96 assertFalse(listenableFuture.isDone());
97
98 listenableFuture.addListener(singleCallListener, executorService);
99 abstractFuture.set(DATA1);
100 assertEquals(DATA1, listenableFuture.get());
101 singleCallListener.waitForCall();
102
103 assertTrue(spy.wasExecuted);
104 assertTrue(singleCallListener.wasCalled());
105 assertTrue(listenableFuture.isDone());
106 }
107
108 public void testListenInPoolThreadCustomExecutorInterrupted()
109 throws Exception {
110 final CountDownLatch submitSuccessful = new CountDownLatch(1);
111 ExecutorService executorService = new ThreadPoolExecutor(
112 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
113 new SynchronousQueue<Runnable>(),
114 new ThreadFactoryBuilder().setDaemon(true).build()) {
115 @Override
116 protected void beforeExecute(Thread t, Runnable r) {
117 submitSuccessful.countDown();
118 }
119 };
120 NonListenableSettableFuture<String> abstractFuture =
121 NonListenableSettableFuture.create();
122 ListenableFuture<String> listenableFuture =
123 listenInPoolThread(abstractFuture, executorService);
124
125 SingleCallListener singleCallListener = new SingleCallListener();
126 singleCallListener.expectCall();
127
128 assertFalse(singleCallListener.wasCalled());
129 assertFalse(listenableFuture.isDone());
130
131 listenableFuture.addListener(singleCallListener, directExecutor());
132
133
134
135
136
137 submitSuccessful.await();
138 executorService.shutdownNow();
139 abstractFuture.set(DATA1);
140 assertEquals(DATA1, listenableFuture.get());
141 singleCallListener.waitForCall();
142
143 assertTrue(singleCallListener.wasCalled());
144 assertTrue(listenableFuture.isDone());
145 }
146
147
148
149
150
151 private static final class NonListenableSettableFuture<V>
152 extends ForwardingFuture<V> {
153 static <V> NonListenableSettableFuture<V> create() {
154 return new NonListenableSettableFuture<V>();
155 }
156
157 final SettableFuture<V> delegate = SettableFuture.create();
158
159 @Override protected Future<V> delegate() {
160 return delegate;
161 }
162
163 void set(V value) {
164 delegate.set(value);
165 }
166 }
167
168 private static final class RuntimeExceptionThrowingFuture<V>
169 implements Future<V> {
170 final CountDownLatch allowGetToComplete = new CountDownLatch(1);
171
172 @Override
173 public boolean cancel(boolean mayInterruptIfRunning) {
174 throw new AssertionFailedError();
175 }
176
177 @Override
178 public V get() throws InterruptedException {
179
180
181
182
183 allowGetToComplete.await(1, SECONDS);
184 throw new RuntimeException("expected, should be caught");
185 }
186
187 @Override
188 public V get(long timeout, TimeUnit unit) {
189 throw new AssertionFailedError();
190 }
191
192 @Override
193 public boolean isCancelled() {
194 throw new AssertionFailedError();
195 }
196
197 @Override
198 public boolean isDone() {
199
200
201
202
203
204
205 return false;
206 }
207 }
208
209 private static final class RecordingRunnable implements Runnable {
210 final CountDownLatch wasRun = new CountDownLatch(1);
211
212
213 @Override
214 public synchronized void run() {
215 checkState(wasRun.getCount() > 0);
216 wasRun.countDown();
217 }
218 }
219
220 public void testListenInPoolThreadRunsListenerAfterRuntimeException()
221 throws Exception {
222 RuntimeExceptionThrowingFuture<String> input =
223 new RuntimeExceptionThrowingFuture<String>();
224
225
226
227
228
229 assertFalse("Can't test the main listenInPoolThread path "
230 + "if the input is already a ListenableFuture",
231 ListenableFuture.class.isInstance(input));
232 ListenableFuture<String> listenable = listenInPoolThread(input);
233
234
235
236
237 RecordingRunnable earlyListener = new RecordingRunnable();
238 listenable.addListener(earlyListener, directExecutor());
239
240 input.allowGetToComplete.countDown();
241
242 assertTrue(earlyListener.wasRun.await(1, SECONDS));
243
244
245 RecordingRunnable lateListener = new RecordingRunnable();
246 listenable.addListener(lateListener, directExecutor());
247 assertTrue(lateListener.wasRun.await(1, SECONDS));
248 }
249
250 public void testAdapters_nullChecks() throws Exception {
251 new ClassSanityTester()
252 .forAllPublicStaticMethods(JdkFutureAdapters.class)
253 .thatReturn(Future.class)
254 .testNulls();
255 }
256 }